{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e484e6a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _components.encoder.avro"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "330229f3",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/kumaran/.local/lib/python3.11/site-packages/pydantic/_internal/_config.py:257: UserWarning: Valid config keys have changed in V2:\n",
      "* 'json_encoders' has been removed\n",
      "  warnings.warn(message, UserWarning)\n"
     ]
    }
   ],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "import io\n",
    "import json\n",
    "from typing import *\n",
    "\n",
    "import fastavro\n",
    "from pydantic import BaseModel, create_model\n",
    "\n",
    "from fastkafka._components.logger import get_logger\n",
    "from fastkafka._components.meta import export"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "84ed548a",
   "metadata": {},
   "outputs": [],
   "source": [
    "import tempfile\n",
    "\n",
    "import pytest\n",
    "from pydantic import Field\n",
    "\n",
    "from fastkafka._components.logger import suppress_timestamps"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "84c22900",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "40c6f220",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "logger = get_logger(__name__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1e61e48a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: ok\n"
     ]
    }
   ],
   "source": [
    "suppress_timestamps()\n",
    "logger = get_logger(__name__, level=20)\n",
    "logger.info(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d561af81",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b19543e7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@export(\"fastkafka.encoder\")\n",
    "class AvroBase(BaseModel):\n",
    "    \"\"\"This is base pydantic class that will add some methods\"\"\"\n",
    "\n",
    "    @classmethod\n",
    "    def avro_schema_for_pydantic_object(\n",
    "        cls,\n",
    "        pydantic_model: BaseModel,\n",
    "        by_alias: bool = True,\n",
    "        namespace: Optional[str] = None,\n",
    "    ) -> Dict[str, Any]:\n",
    "        \"\"\"\n",
    "        Returns the Avro schema for the given Pydantic object.\n",
    "\n",
    "        Args:\n",
    "            pydantic_model (BaseModel): The Pydantic object.\n",
    "            by_alias (bool, optional): Generate schemas using aliases defined. Defaults to True.\n",
    "            namespace (Optional[str], optional): Optional namespace string for schema generation.\n",
    "\n",
    "        Returns:\n",
    "            Dict[str, Any]: The Avro schema for the model.\n",
    "        \"\"\"\n",
    "\n",
    "        schema = pydantic_model.__class__.model_json_schema(by_alias=by_alias)\n",
    "\n",
    "        if namespace is None:\n",
    "            # default namespace will be based on title\n",
    "            namespace = schema[\"title\"]\n",
    "\n",
    "        return cls._avro_schema(schema, namespace)\n",
    "    \n",
    "    @classmethod\n",
    "    def avro_schema_for_pydantic_class(\n",
    "        cls,\n",
    "        pydantic_model: Type[BaseModel],\n",
    "        by_alias: bool = True,\n",
    "        namespace: Optional[str] = None,\n",
    "    ) -> Dict[str, Any]:\n",
    "        \"\"\"\n",
    "        Returns the Avro schema for the given Pydantic class.\n",
    "\n",
    "        Args:\n",
    "            pydantic_model (Type[BaseModel]): The Pydantic class.\n",
    "            by_alias (bool, optional): Generate schemas using aliases defined. Defaults to True.\n",
    "            namespace (Optional[str], optional): Optional namespace string for schema generation.\n",
    "\n",
    "        Returns:\n",
    "            Dict[str, Any]: The Avro schema for the model.\n",
    "        \"\"\"\n",
    "\n",
    "        schema = pydantic_model.model_json_schema(by_alias=by_alias)\n",
    "\n",
    "        if namespace is None:\n",
    "            # default namespace will be based on title\n",
    "            namespace = schema[\"title\"]\n",
    "\n",
    "        return cls._avro_schema(schema, namespace)\n",
    "\n",
    "    @classmethod\n",
    "    def avro_schema(\n",
    "        cls, by_alias: bool = True, namespace: Optional[str] = None\n",
    "    ) -> Dict[str, Any]:\n",
    "        \"\"\"\n",
    "        Returns the Avro schema for the Pydantic class.\n",
    "\n",
    "        Args:\n",
    "            by_alias (bool, optional): Generate schemas using aliases defined. Defaults to True.\n",
    "            namespace (Optional[str], optional): Optional namespace string for schema generation.\n",
    "\n",
    "        Returns:\n",
    "            Dict[str, Any]: The Avro schema for the model.\n",
    "        \"\"\"\n",
    "        schema = cls.schema(by_alias=by_alias)\n",
    "\n",
    "        if namespace is None:\n",
    "            # default namespace will be based on title\n",
    "            namespace = schema[\"title\"]\n",
    "\n",
    "        return cls._avro_schema(schema, namespace)\n",
    "\n",
    "    @staticmethod\n",
    "    def _avro_schema(schema: Dict[str, Any], namespace: str) -> Dict[str, Any]:\n",
    "        \"\"\"Return the avro schema for the given pydantic schema\"\"\"\n",
    "\n",
    "        classes_seen = set()\n",
    "\n",
    "        def get_definition(ref: str, schema: Dict[str, Any]) -> Dict[str, Any]:\n",
    "            \"\"\"Reading definition of base schema for nested structs\"\"\"\n",
    "            id = ref.replace(\"#/definitions/\", \"\")\n",
    "            d = schema.get(\"definitions\", {}).get(id)\n",
    "            if d is None:\n",
    "                raise RuntimeError(f\"Definition {id} does not exist\")\n",
    "            return d  # type: ignore\n",
    "\n",
    "        def get_type(value: Dict[str, Any]) -> Dict[str, Any]:\n",
    "            \"\"\"Returns a type of a single field\"\"\"\n",
    "            t = value.get(\"type\")\n",
    "            f = value.get(\"format\")\n",
    "            r = value.get(\"$ref\")\n",
    "            a = value.get(\"additionalProperties\")\n",
    "            avro_type_dict: Dict[str, Any] = {}\n",
    "            if \"default\" in value:\n",
    "                avro_type_dict[\"default\"] = value.get(\"default\")\n",
    "            if \"description\" in value:\n",
    "                avro_type_dict[\"doc\"] = value.get(\"description\")\n",
    "            if \"allOf\" in value and len(value[\"allOf\"]) == 1:\n",
    "                r = value[\"allOf\"][0][\"$ref\"]\n",
    "            if r is not None:\n",
    "                class_name = r.replace(\"#/definitions/\", \"\")\n",
    "                if class_name in classes_seen:\n",
    "                    avro_type_dict[\"type\"] = class_name\n",
    "                else:\n",
    "                    d = get_definition(r, schema)\n",
    "                    if \"enum\" in d:\n",
    "                        avro_type_dict[\"type\"] = {\n",
    "                            \"type\": \"enum\",\n",
    "                            \"symbols\": [str(v) for v in d[\"enum\"]],\n",
    "                            \"name\": d[\"title\"],\n",
    "                        }\n",
    "                    else:\n",
    "                        avro_type_dict[\"type\"] = {\n",
    "                            \"type\": \"record\",\n",
    "                            \"fields\": get_fields(d),\n",
    "                            # Name of the struct should be unique true the complete schema\n",
    "                            # Because of this the path in the schema is tracked and used as name for a nested struct/array\n",
    "                            \"name\": class_name,\n",
    "                        }\n",
    "                    classes_seen.add(class_name)\n",
    "            elif t == \"array\":\n",
    "                items = value.get(\"items\")\n",
    "                tn = get_type(items)  # type: ignore\n",
    "                # If items in array are a object:\n",
    "                if \"$ref\" in items:  # type: ignore\n",
    "                    tn = tn[\"type\"]\n",
    "                # If items in array are a logicalType\n",
    "                if (\n",
    "                    isinstance(tn, dict)\n",
    "                    and isinstance(tn.get(\"type\", {}), dict)\n",
    "                    and tn.get(\"type\", {}).get(\"logicalType\") is not None\n",
    "                ):\n",
    "                    tn = tn[\"type\"]\n",
    "                avro_type_dict[\"type\"] = {\"type\": \"array\", \"items\": tn}\n",
    "            elif t == \"string\" and f == \"date-time\":\n",
    "                avro_type_dict[\"type\"] = {\n",
    "                    \"type\": \"long\",\n",
    "                    \"logicalType\": \"timestamp-micros\",\n",
    "                }\n",
    "            elif t == \"string\" and f == \"date\":\n",
    "                avro_type_dict[\"type\"] = {\n",
    "                    \"type\": \"int\",\n",
    "                    \"logicalType\": \"date\",\n",
    "                }\n",
    "            elif t == \"string\" and f == \"time\":\n",
    "                avro_type_dict[\"type\"] = {\n",
    "                    \"type\": \"long\",\n",
    "                    \"logicalType\": \"time-micros\",\n",
    "                }\n",
    "            elif t == \"string\" and f == \"uuid\":\n",
    "                avro_type_dict[\"type\"] = {\n",
    "                    \"type\": \"string\",\n",
    "                    \"logicalType\": \"uuid\",\n",
    "                }\n",
    "            elif t == \"string\":\n",
    "                avro_type_dict[\"type\"] = \"string\"\n",
    "            elif t == \"number\":\n",
    "                avro_type_dict[\"type\"] = \"double\"\n",
    "            elif t == \"integer\":\n",
    "                # integer in python can be a long\n",
    "                avro_type_dict[\"type\"] = \"long\"\n",
    "            elif t == \"boolean\":\n",
    "                avro_type_dict[\"type\"] = \"boolean\"\n",
    "            elif t == \"object\":\n",
    "                if a is None:\n",
    "                    value_type = \"string\"\n",
    "                else:\n",
    "                    value_type = get_type(a)  # type: ignore\n",
    "                if isinstance(value_type, dict) and len(value_type) == 1:\n",
    "                    value_type = value_type.get(\"type\")  # type: ignore\n",
    "                avro_type_dict[\"type\"] = {\"type\": \"map\", \"values\": value_type}\n",
    "            else:\n",
    "                raise NotImplementedError(\n",
    "                    f\"Type '{t}' not support yet, \"\n",
    "                    f\"please report this at https://github.com/godatadriven/pydantic-avro/issues\"\n",
    "                )\n",
    "            return avro_type_dict\n",
    "\n",
    "        def get_fields(s: Dict[str, Any]) -> List[Dict[str, Any]]:\n",
    "            \"\"\"Return a list of fields of a struct\"\"\"\n",
    "            fields = []\n",
    "\n",
    "            required = s.get(\"required\", [])\n",
    "            for key, value in s.get(\"properties\", {}).items():\n",
    "                if \"type\" not in value and \"anyOf\" in value:\n",
    "                    any_of_types = value.pop(\"anyOf\")\n",
    "                    types = [x[\"type\"] for x in any_of_types if x[\"type\"] != \"null\"]\n",
    "                    value[\"type\"] = types[0]\n",
    "                avro_type_dict = get_type(value)\n",
    "                avro_type_dict[\"name\"] = key\n",
    "\n",
    "                if key not in required:\n",
    "                    if avro_type_dict.get(\"default\") is None:\n",
    "                        avro_type_dict[\"type\"] = [\"null\", avro_type_dict[\"type\"]]\n",
    "                        avro_type_dict[\"default\"] = None\n",
    "\n",
    "                fields.append(avro_type_dict)\n",
    "            return fields\n",
    "\n",
    "        fields = get_fields(schema)\n",
    "\n",
    "        return {\n",
    "            \"type\": \"record\",\n",
    "            \"namespace\": namespace,\n",
    "            \"name\": schema[\"title\"],\n",
    "            \"fields\": fields,\n",
    "        }"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "97c98333",
   "metadata": {},
   "outputs": [],
   "source": [
    "test_user_schema = {\n",
    "    \"type\": \"record\",\n",
    "    \"namespace\": \"User\",\n",
    "    \"name\": \"User\",\n",
    "    \"fields\": [\n",
    "        {\"type\": \"string\", \"name\": \"name\"},\n",
    "        {\"type\": [\"null\", \"long\"], \"name\": \"favorite_number\", \"default\": None},\n",
    "        {\"type\": [\"null\", \"string\"], \"name\": \"favorite_color\", \"default\": None},\n",
    "    ],\n",
    "}\n",
    "\n",
    "\n",
    "class User(BaseModel):\n",
    "    name: str\n",
    "    favorite_number: Optional[int] = None\n",
    "    favorite_color: Optional[str] = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "30d7c9a0",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'type': 'record',\n",
       " 'namespace': 'User',\n",
       " 'name': 'User',\n",
       " 'fields': [{'type': 'string', 'name': 'name'},\n",
       "  {'default': None, 'type': ['null', 'long'], 'name': 'favorite_number'},\n",
       "  {'default': None, 'type': ['null', 'string'], 'name': 'favorite_color'}]}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "actual = AvroBase.avro_schema_for_pydantic_class(User)\n",
    "display(actual)\n",
    "assert actual == test_user_schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "934835d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "# ToDo\n",
    "# 1. Rewrite with fastavro - Done\n",
    "# 2. Generate schema from pydantic itself - Done\n",
    "#        - Pydantic to avro schema conversion methods - Done\n",
    "# 3. Generate pydantic class from avro schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "659da394",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@export(\"fastkafka.encoder\")\n",
    "def avro_encoder(msg: BaseModel) -> bytes:\n",
    "    \"\"\"\n",
    "    Encoder to encode pydantic instances to avro message\n",
    "\n",
    "    Args:\n",
    "        msg: An instance of pydantic basemodel\n",
    "\n",
    "    Returns:\n",
    "        A bytes message which is encoded from pydantic basemodel\n",
    "    \"\"\"\n",
    "    schema = fastavro.schema.parse_schema(AvroBase.avro_schema_for_pydantic_object(msg))\n",
    "    bytes_writer = io.BytesIO()\n",
    "    \n",
    "    d = msg.model_dump()\n",
    "    for k, v in d.items():\n",
    "        if \"pydantic_core\" in str(type(v)):\n",
    "            d[k] = str(v)\n",
    "    \n",
    "    fastavro.schemaless_writer(bytes_writer, schema, d)\n",
    "    raw_bytes = bytes_writer.getvalue()\n",
    "    return raw_bytes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7a89f790",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "b'\\x0eKumaran\\x02\\x12\\x02\\nblack'"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "msg = User(name=\"Kumaran\", favorite_number=9, favorite_color=\"black\")\n",
    "\n",
    "\n",
    "actual = avro_encoder(msg)\n",
    "display(actual)\n",
    "\n",
    "assert isinstance(actual, bytes)\n",
    "assert actual == b\"\\x0eKumaran\\x02\\x12\\x02\\nblack\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cdbdbb80",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@export(\"fastkafka.encoder\")\n",
    "def avro_decoder(raw_msg: bytes, cls: Type[BaseModel]) -> Any:\n",
    "    \"\"\"\n",
    "    Decoder to decode avro encoded messages to pydantic model instance\n",
    "\n",
    "    Args:\n",
    "        raw_msg: Avro encoded bytes message received from Kafka topic\n",
    "        cls: Pydantic class; This pydantic class will be used to construct instance of same class\n",
    "\n",
    "    Returns:\n",
    "        An instance of given pydantic class\n",
    "    \"\"\"\n",
    "    schema = fastavro.schema.parse_schema(AvroBase.avro_schema_for_pydantic_class(cls))\n",
    "\n",
    "    bytes_reader = io.BytesIO(raw_msg)\n",
    "    msg_dict = fastavro.schemaless_reader(bytes_reader, schema)\n",
    "\n",
    "    return cls(**msg_dict)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c98ae71a",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "User(name='123', favorite_number=0, favorite_color='111')"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "raw_msg = b\"\\x06123\\x02\\x00\\x02\\x06111\"\n",
    "\n",
    "\n",
    "actual = avro_decoder(raw_msg, cls=User)\n",
    "display(actual)\n",
    "\n",
    "assert isinstance(actual, User)\n",
    "assert actual.name == \"123\"\n",
    "assert actual.favorite_number == 0\n",
    "assert actual.favorite_color == \"111\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7c9dce67",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "497b7549",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@export(\"fastkafka.encoder\")\n",
    "def avsc_to_pydantic(schema: Dict[str, Any]) -> Type[BaseModel]:\n",
    "    \"\"\"\n",
    "    Generate pydantic model from given Avro Schema\n",
    "\n",
    "    Args:\n",
    "        schema: Avro schema in dictionary format\n",
    "\n",
    "    Returns:\n",
    "        Pydantic model class built from given avro schema\n",
    "    \"\"\"\n",
    "    if \"type\" not in schema or schema[\"type\"] != \"record\":\n",
    "        raise AttributeError(\"Type not supported\")\n",
    "    if \"name\" not in schema:\n",
    "        raise AttributeError(\"Name is required\")\n",
    "    if \"fields\" not in schema:\n",
    "        raise AttributeError(\"fields are required\")\n",
    "\n",
    "    classes = {}\n",
    "\n",
    "    def get_python_type(t: Union[str, Dict[str, Any]]) -> str:\n",
    "        \"\"\"Returns python type for given avro type\"\"\"\n",
    "        optional = False\n",
    "        if isinstance(t, str):\n",
    "            if t == \"string\":\n",
    "                py_type = \"str\"\n",
    "            elif t == \"long\" or t == \"int\":\n",
    "                py_type = \"int\"\n",
    "            elif t == \"boolean\":\n",
    "                py_type = \"bool\"\n",
    "            elif t == \"double\" or t == \"float\":\n",
    "                py_type = \"float\"\n",
    "            elif t in classes:\n",
    "                py_type = t\n",
    "            else:\n",
    "                raise NotImplementedError(f\"Type {t} not supported yet\")\n",
    "        elif isinstance(t, list):\n",
    "            if \"null\" in t:\n",
    "                optional = True\n",
    "            if len(t) > 2 or (not optional and len(t) > 1):\n",
    "                raise NotImplementedError(\"Only a single type ia supported yet\")\n",
    "            c = t.copy()\n",
    "            c.remove(\"null\")\n",
    "            py_type = get_python_type(c[0])\n",
    "        elif t.get(\"logicalType\") == \"uuid\":\n",
    "            py_type = \"UUID\"\n",
    "        elif t.get(\"logicalType\") == \"decimal\":\n",
    "            py_type = \"Decimal\"\n",
    "        elif (\n",
    "            t.get(\"logicalType\") == \"timestamp-millis\"\n",
    "            or t.get(\"logicalType\") == \"timestamp-micros\"\n",
    "        ):\n",
    "            py_type = \"datetime\"\n",
    "        elif (\n",
    "            t.get(\"logicalType\") == \"time-millis\"\n",
    "            or t.get(\"logicalType\") == \"time-micros\"\n",
    "        ):\n",
    "            py_type = \"time\"\n",
    "        elif t.get(\"logicalType\") == \"date\":\n",
    "            py_type = \"date\"\n",
    "        elif t.get(\"type\") == \"enum\":\n",
    "            enum_name = t.get(\"name\")\n",
    "            if enum_name not in classes:\n",
    "                enum_class = f\"class {enum_name}(str, Enum):\\n\"\n",
    "                for s in t.get(\"symbols\"):  # type: ignore\n",
    "                    enum_class += f'    {s} = \"{s}\"\\n'\n",
    "                classes[enum_name] = enum_class\n",
    "            py_type = enum_name  # type: ignore\n",
    "        elif t.get(\"type\") == \"string\":\n",
    "            py_type = \"str\"\n",
    "        elif t.get(\"type\") == \"array\":\n",
    "            sub_type = get_python_type(t.get(\"items\"))  # type: ignore\n",
    "            py_type = f\"List[{sub_type}]\"\n",
    "        elif t.get(\"type\") == \"record\":\n",
    "            record_type_to_pydantic(t)\n",
    "            py_type = t.get(\"name\")  # type: ignore\n",
    "        elif t.get(\"type\") == \"map\":\n",
    "            value_type = get_python_type(t.get(\"values\"))  # type: ignore\n",
    "            py_type = f\"Dict[str, {value_type}]\"\n",
    "        else:\n",
    "            raise NotImplementedError(\n",
    "                f\"Type {t} not supported yet, \"\n",
    "                f\"please report this at https://github.com/godatadriven/pydantic-avro/issues\"\n",
    "            )\n",
    "        if optional:\n",
    "            return f\"Optional[{py_type}]\"\n",
    "        else:\n",
    "            return py_type\n",
    "\n",
    "    def record_type_to_pydantic(schema: Dict[str, Any]) -> Type[BaseModel]:\n",
    "        \"\"\"Convert a single avro record type to a pydantic class\"\"\"\n",
    "        name = (\n",
    "            schema[\"name\"]\n",
    "            if \".\" not in schema[\"name\"]\n",
    "            else schema[\"name\"].split(\".\")[-1]\n",
    "        )\n",
    "        current = f\"class {schema['name']}(BaseModel):\\n\"\n",
    "\n",
    "        kwargs: Dict[str, Tuple[str, Any]] = {}\n",
    "\n",
    "        if len(schema[\"fields\"]) == 0:\n",
    "            raise ValueError(\"Avro schema has no fields\")\n",
    "\n",
    "        for field in schema[\"fields\"]:\n",
    "            n = field[\"name\"]\n",
    "            t = get_python_type(field[\"type\"])\n",
    "            default = field.get(\"default\")\n",
    "            if \"default\" not in field:\n",
    "                kwargs[n] = (t, ...)\n",
    "                current += f\"    {n}: {t}\\n\"\n",
    "            elif isinstance(default, (bool, type(None))):\n",
    "                kwargs[n] = (t, default)\n",
    "                current += f\"    {n}: {t} = {default}\\n\"\n",
    "            else:\n",
    "                kwargs[n] = (t, default)\n",
    "                current += f\"    {n}: {t} = {json.dumps(default)}\\n\"\n",
    "\n",
    "        classes[name] = current\n",
    "        pydantic_model = create_model(name, __module__=__name__, **kwargs)  # type: ignore\n",
    "        return pydantic_model  # type: ignore\n",
    "\n",
    "    return record_type_to_pydantic(schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8dbc3c30",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'type': 'record',\n",
       " 'namespace': 'User',\n",
       " 'name': 'User',\n",
       " 'fields': [{'type': 'string', 'name': 'name'},\n",
       "  {'default': None, 'type': ['null', 'long'], 'name': 'favorite_number'},\n",
       "  {'default': None, 'type': ['null', 'string'], 'name': 'favorite_color'}]}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "__main__.User"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "{'name': FieldInfo(annotation=str, required=True),\n",
       " 'favorite_number': FieldInfo(annotation=Union[int, NoneType], required=False),\n",
       " 'favorite_color': FieldInfo(annotation=Union[str, NoneType], required=False)}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "User(name='Kumaran', favorite_number=9, favorite_color='black')"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "User(name='Kumaran', favorite_number=9, favorite_color='black')"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "user_schema = AvroBase.avro_schema_for_pydantic_class(User)\n",
    "display(user_schema)\n",
    "\n",
    "A = avsc_to_pydantic(user_schema)\n",
    "display(A)\n",
    "display(A.model_fields)\n",
    "# assert isinstance(A, ModelMetaclass)\n",
    "assert list(A.model_fields.keys()) == [\"name\", \"favorite_number\", \"favorite_color\"]\n",
    "\n",
    "a = A(name=\"Kumaran\", favorite_number=\"9\", favorite_color=\"black\")\n",
    "u = User(\n",
    "    name=\"Kumaran\", favorite_number=\"9\", favorite_color=\"black\"\n",
    ")\n",
    "display(a)\n",
    "display(u)\n",
    "assert a.name == u.name\n",
    "assert a.favorite_number == u.favorite_number\n",
    "assert a.favorite_color == u.favorite_color"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "68efa351",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'type': 'record', 'namespace': 'User', 'name': 'User', 'fields': []}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "<ExceptionInfo ValueError('Avro schema has no fields') tblen=3>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "user_schema = AvroBase.avro_schema_for_pydantic_class(User)\n",
    "user_schema[\"fields\"] = []\n",
    "\n",
    "display(user_schema)\n",
    "\n",
    "with pytest.raises(ValueError) as e:\n",
    "    A = avsc_to_pydantic(user_schema)\n",
    "display(e)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9cf574fc",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
